package org.example.stream;

import java.util.concurrent.Flow;

/**
 * 发布订阅模型
 * 发布者所有消息都会被订阅者处理
 */
public class SomeSubscriber implements Flow.Subscriber<Integer> {


    private String someSubscriberName;

    public SomeSubscriber(String someSubscriberName) {
        this.someSubscriberName = someSubscriberName;
    }

    // 声明订阅对象
    private Flow.Subscription subscription;

    // 订阅者中第一个被执行方法
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.println(this.someSubscriberName + "：订阅Flow 执行onSubscribe方法");
        this.subscription = subscription;
        //首次请求8条消息
        this.subscription.request(8);
    }

    // 订阅者接收到消息，方法触发一次
    public void onNext(Integer item) {
        System.out.println(this.someSubscriberName + "：订阅者处理消息数据为" + item);

        // 消费一条消息，再订阅10条消息
        this.subscription.request(10);
    }

    // 订阅，消费过程中存在异常
    public void onError(Throwable throwable) {
        System.out.println("onError ");
        throwable.printStackTrace();
        // 取消订阅关系
        this.subscription.cancel();
    }

    // 消息处理完毕后触发
    public void onComplete() {
        System.out.println(this.someSubscriberName + "：发布者已关闭，订阅者将所有消息处理完事件");
    }
}
